/*
 * Copyright 2013-16 Board of Trustees of Stanford University
 * Copyright 2013-16 Ecole Polytechnique Federale Lausanne (EPFL)
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
 */

#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>
#include <pthread.h>

#include <ixev.h>
#include <net/ip.h>
#include <mempool.h>

#define MAX_THREADS 40
#define MAX_CONNS_PER_CORE 1000
#define US_PER_SECOND 1000*1000
#define TICKS_PER_US 2400
#define SLOT_NUM 34
#define LATENCY_PERCENT 0.99

static unsigned int nrequests;
static unsigned int msg_size, nconn;
static int dst_port;
static uint32_t dst_ip;
static uint32_t print_size1 = 64000;
static uint32_t print_size2 = 64000;

unsigned long thread_count[MAX_THREADS];
unsigned long prv_count[MAX_THREADS];
static uint32_t num_resp[MAX_THREADS][MAX_CONNS_PER_CORE];
static uint32_t prv_num_resp[MAX_THREADS][MAX_CONNS_PER_CORE];
uint64_t total_cur_time[MAX_THREADS];
uint64_t total_pre_time[MAX_THREADS];
uint64_t cur_time[MAX_THREADS][MAX_CONNS_PER_CORE];
uint64_t pre_time[MAX_THREADS][MAX_CONNS_PER_CORE];

int times[32];
uint64_t pre_num[32][SLOT_NUM];

static double packet_process_speed;


//for 99% latency
static int time_bin[] = {0, 20, 40, 60, 80, 100, 120,  140, 160, 180, 200, 220, 240, 260, 280, 300, 320, 340, 360, 380, 400, 420, 440, 440, 460, 480, 500, 525, 550, 575, 600, 650, 700,800 };
static unsigned long time_slot [32][SLOT_NUM];



static struct mempool_datastore pp_conn_datastore;
static __thread struct mempool pp_conn_pool;

enum {
	CLIENT_MODE_RECV,
	CLIENT_MODE_SEND,
};

struct client_conn {
	int conn_id;
    unsigned long long pp_time;
    unsigned long long pc_time;
	struct ixev_ctx ctx;
	struct ip_tuple id;
	int mode;
	size_t bytes_recvd;
	size_t bytes_sent;
	int thread_id;
	unsigned int cnt;
	char data[];
};

struct thread_ctx {
	int id;
	unsigned int num_conn;
};
#define MAX_CPU 32
static struct thread_ctx tctx[MAX_CPU];


static void client_die(struct client_conn *c)
{
	//if (c->thread_id == -1)
		//return;

	ixev_close(&c->ctx);
	tctx[c->thread_id].num_conn--;
	printf("thread: %d: close connection %u\n", c->thread_id, tctx[c->thread_id].num_conn);
	c->thread_id = -1;

    if (tctx[c->thread_id].num_conn == 0) {
		//pthread_exit(-1);
	}
}

void assign_bin (unsigned long time, int core)
{
    int i = 0;
    for(i=0; i < SLOT_NUM - 2; i++)
    {
        if(time >= time_bin[i] && time <= time_bin[i+1])
        {
            time_slot[core][i]++;
            return ;
        }
    }
    if(time > time_bin[SLOT_NUM - 1])
        time_slot[core][SLOT_NUM - 1]++;
}

double calculate_latency(int x0, int x1, double y0, double y1)
{
    double percent = LATENCY_PERCENT;

    double latency = (double) x0 + (double)(x1 - x0) * (percent - y0) / (y1 - y0);
    return latency;
}

static void print_stats(int thread_id)
{
	ssize_t ret;
	char buf;
    int i = 0;
    unsigned long total_cnt;
    double percent = LATENCY_PERCENT;
    double packet_process_latency;
    int pre_sum = 0;
    int sum = 0;

    total_cur_time[thread_id] = rdtsc();
    uint64_t timetime = (total_cur_time[thread_id] - total_pre_time[thread_id])/ TICKS_PER_US;
    if (timetime > 1000000) {
        uint32_t avg_resp;
        total_cnt = thread_count[thread_id] - prv_count[thread_id];
        packet_process_speed = (double)(thread_count[thread_id] - prv_count[thread_id]) *US_PER_SECOND / timetime;
        //printf("Thread: %d \trps: %f \ttime: %ld\n", thread_id, packet_process_speed, timetime);

        printf("times: %d \t Thread: %d \trps: %.2f\n",times[thread_id], thread_id, packet_process_speed);

        avg_resp = (thread_count[thread_id] - prv_count[thread_id]) / nconn;
		packet_process_latency = (double)timetime / avg_resp;

        printf("times: %d \t Thread: %d \tavg: %.2f\n", times[thread_id], thread_id, packet_process_latency); //average latency

       /* for(i = 0; i < SLOT_NUM; i++)
                          {
                              //printf("bin[%d] :%lf \n",i, (double) (time_slot[thread_id][i] - pre_num[i])/total_cnt );
            printf("slot[%d] :%ld \n",i, time_slot[thread_id][i]);
                          }*/

        for(i = 0; i < SLOT_NUM - 1; i++)
        {
            sum += time_slot[thread_id][i] - pre_num[thread_id][i];
            pre_num[thread_id][i] = time_slot[thread_id][i];

            if(sum >= total_cnt * percent)
                break;
            else
                pre_sum = sum;
        }
        if(i != SLOT_NUM - 1 )
        {
            //printf("[%d], [%d], %d, %d, %.2f, %.2f \n",time_bin[i-1], time_bin[i],pre_sum,sum, (double)pre_sum / total_cnt, (double)sum /total_cnt);
            packet_process_latency = calculate_latency(time_bin[i], time_bin[i+1],(double)pre_sum / total_cnt, (double)sum /total_cnt);
        }
        else
            packet_process_latency = time_bin[SLOT_NUM - 1];

        printf("times: %d \t Thread: %d \t99%: %.2f\n", times[thread_id], thread_id, packet_process_latency); //99% latency
        times[thread_id] ++;

        for( ; i < SLOT_NUM; i++)
        {
           pre_num[thread_id][i]= time_slot[thread_id][i];
        }

        /*if (2000000 > timetime ) {
			print_size1 = print_size1 * 2;
		}
		else if (4000000 < timetime ) {
			print_size1= print_size1 / 2;
        }*/
		//printf("count: %d, prv_count: %d, timetime\n:f",count,prv_count,timetime);
        total_pre_time[thread_id] = total_cur_time[thread_id];
        prv_count[thread_id] = thread_count[thread_id];
	}
}

/*static void print_latency(int thread_id, int conn_id)
{
	if (num_resp[thread_id][conn_id] - prv_num_resp[thread_id][conn_id] > print_size2 / msg_size) {

		cur_time[thread_id][conn_id] = rdtsc();
		uint64_t timetime = (cur_time[thread_id][conn_id]- pre_time[thread_id][conn_id]) /TICKS_PER_US;

		packet_process_latency = (double)(timetime / (num_resp[thread_id][conn_id] - prv_num_resp[thread_id][conn_id])) ;
		printf("[Latency]\tThread: %d-%d \tus: %.2f\n", thread_id, conn_id, packet_process_latency);

		if (2000000 > timetime ) {
			print_size1 = print_size2 * 2;
		}
		else if (4000000 < timetime ) {
			print_size1= print_size2 / 2;
		}
		pre_time[thread_id][conn_id] = cur_time[thread_id][conn_id];
		prv_num_resp[thread_id][conn_id] = num_resp[thread_id][conn_id];
	}
}*/


static void main_handler(struct ixev_ctx *ctx, unsigned int reason)
{
	ssize_t ret;
    unsigned long time_interval;
	struct client_conn *c = container_of(ctx, struct client_conn, ctx);

	//printf("Begin send.\n");
        char tbuf[4000];
	while (1) {
		if (c->mode == CLIENT_MODE_SEND) {
			ret = ixev_send_zc(ctx, &c->data[c->bytes_sent],
					   msg_size - c->bytes_sent);
            //printf("Send: %d.\n", ret);
			if (ret <= 0) {
				if (ret != -EAGAIN)
					client_die(c);
				return;
			}

			c->bytes_sent += ret;
            if (c->bytes_sent < msg_size)
            {
                printf("why  why---\n");
                 return;
            }


			c->bytes_recvd = 0;
			ixev_set_handler(ctx, IXEVIN, &main_handler);
			c->mode = CLIENT_MODE_RECV;
		} else {
                        ret = ixev_recv(ctx, tbuf,
					msg_size - c->bytes_recvd);
            //printf("Recv: %d.\n", ret);
			if (ret <= 0) {
				if (ret != -EAGAIN)
					client_die(c);
				return;
			}

			c->bytes_recvd += ret;
			if (c->bytes_recvd < msg_size)
            {
                printf("why god why---\n");
                return;
            }
			thread_count[c->thread_id]++;
            c->pc_time = rdtsc();
            time_interval = (c->pc_time - c->pp_time) / TICKS_PER_US;
            c->pp_time = c->pc_time;
            //printf("time_interval %ld",time_interval);
            assign_bin(time_interval, c->thread_id);
            //num_resp[c->thread_id][c->conn_id]++;
			if(--c->cnt == 0) {
				client_die(c);
			}

            print_stats(c->thread_id);
			//print_latency(c->thread_id, c->conn_id);

			c->bytes_sent = 0;
			ixev_set_handler(ctx, IXEVOUT, &main_handler);
			c->mode = CLIENT_MODE_SEND;
		}
	}
}

static struct ixev_ctx *client_accept(struct ip_tuple *id)
{
	return NULL;
}

static void client_release(struct ixev_ctx *ctx)
{
	struct client_conn *c = container_of(ctx, struct client_conn, ctx);
	mempool_free(&pp_conn_pool, c);
	//free(c->data);
	//free(c);
}

static void client_dialed(struct ixev_ctx *ctx, long ret)
{
	if (ret)
		fprintf(stderr, "failed to connect, ret = %ld\n", ret);

	struct client_conn *c = container_of(ctx, struct client_conn, ctx);

	c->mode = CLIENT_MODE_SEND;
	c->bytes_sent = 0;

	tctx[c->thread_id].num_conn++;

	printf("thread: %d: new connection %u\n", c->thread_id, tctx[c->thread_id].num_conn);
	fflush(stdout);

	ixev_set_handler(ctx, IXEVOUT, &main_handler);
	main_handler(&c->ctx, IXEVOUT);
}

struct ixev_conn_ops stream_conn_ops = {
	.accept		= &client_accept,
	.release	= &client_release,
	.dialed		= &client_dialed,
};

static int parse_ip_addr(const char *str, uint32_t *addr)
{
	unsigned char a, b, c, d;

	if (sscanf(str, "%hhu.%hhu.%hhu.%hhu", &a, &b, &c, &d) != 4)
		return -EINVAL;

	*addr = MAKE_IP_ADDR(a, b, c, d);
	return 0;
}

static void *pp_main(void *arg)
{
	int ret, i;
	struct thread_ctx *tctx = (struct thread_ctx *) arg;
	uint64_t cur_time;

	tctx->num_conn = 0;

	printf("thread %d started\n", tctx->id);

	ret = ixev_init_thread();
	if (ret) {
		fprintf(stderr, "unable to init IXEV\n");
		exit(ret);
	}

	ret = mempool_create(&pp_conn_pool, &pp_conn_datastore);
	if (ret) {
		fprintf(stderr, "unable to create mempool\n");
		return NULL;
	}

	cur_time = rdtsc();
    total_pre_time[tctx->id] =  cur_time;

	for (i = 0; i < nconn; i++) {
		struct client_conn *c = malloc(sizeof(struct client_conn));
		//c= mempool_alloc(&pp_conn_pool);
		if (!c) {
			fprintf(stderr, "unable to malloc\n");
			exit(-1);
		}
		c->conn_id = i;
		c->id.dst_port = dst_port;
		c->id.dst_ip = dst_ip;
		c->thread_id = tctx->id;
		c->cnt = nrequests;

		pre_time[c->thread_id][c->conn_id] = cur_time;

		/*c->data = malloc(msg_size);

		if (!c->data) {
			fprintf(stderr, "unable to malloc\n");
			exit(-1);
		}*/

		//sprintf(c->data, "id:%p,conn:%d\n", arg, i);

		ixev_dial(&c->ctx, &c->id);
	}

	while (1)
		ixev_wait();

	return 0;
}

int main(int argc, char *argv[])
{
    int i, j, ret, nr_cpu;
	int flags;
    pthread_t tid;
	unsigned int pp_conn_pool_entries;

	pp_conn_pool_entries = 16 * 4096;

	if (argc < 4) {
		fprintf(stderr, "Usage: IP PORT MSG_SIZE [NUM_CONN] [NUM_MSG]\n");
		return -1;
	}

	if (parse_ip_addr(argv[1], &dst_ip)) {
		fprintf(stderr, "Bad IP address '%s'", argv[1]);
		exit(1);
	}

	dst_port = atoi(argv[2]);
	msg_size = atoi(argv[3]);

	if (argc >= 5)
		nconn = atoi(argv[4]);
	nrequests = 10000;
	if (argc == 6)
		nrequests = atoi(argv[5]);

	ixev_init(&stream_conn_ops);

	ret = mempool_create_datastore(&pp_conn_datastore, pp_conn_pool_entries,
								   sizeof(struct client_conn) + msg_size,
								   0, MEMPOOL_DEFAULT_CHUNKSIZE, "pp_conn");
	if (ret) {
		fprintf(stderr, "unable to create mempool\n");
		return ret;
	}

	flags = fcntl(STDIN_FILENO, F_GETFL, 0);
	fcntl(STDIN_FILENO, F_SETFL, flags | O_NONBLOCK);

	nr_cpu = sys_nrcpus();
	if (nr_cpu < 1) {
		fprintf(stderr, "got invalid cpu count %d\n", nr_cpu);
		exit(-1);
	}
	nr_cpu--; /* don't count the main thread */

	sys_spawnmode(true);

    for(j = 0; j < 32; j++)
    {
        times[j] = 0;
        total_pre_time[j] = 0;
        for(i = 0; i < SLOT_NUM; i++)
        {
              pre_num[j][i] = 0;
        }
    }
	for (i = 0; i < nr_cpu; i++) {
		tctx[i].id = i;
		if (pthread_create(&tid, NULL, pp_main, &tctx[i])) {
			fprintf(stderr, "failed to spawn thread %d\n", i);
			exit(-1);
        }
	}

	tctx[i].id = i;
	pp_main(&tctx[i]);

	return 0;
}
